/*chunk table data structure */
#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <dirent.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBCONTROL

#include "limits.h"
#include "adt.h"
#include "sysy_lib.h"
#include "bmap.h"
#include "net_table.h"
#include "configure.h"
#include "net_global.h"
#include "table_proto.h"
#include "mem_cache.h"
#include "../chunk/chunk_proto.h"
#include "lich_md.h"
#include "ylog.h"
#include "dbg.h"

typedef struct {
        uint32_t size;
        uint32_t crc;
        char buf[0];
} infohead_t;

#pragma pack(4)

typedef struct {
        uint16_t size;
        uint16_t magic;
        uint32_t crc;
        char buf[0];
} record_t;

#pragma pack()

#define ITEM_MAGIC  0xa9f0

STATIC int __table_proto_create(table_proto_t **_table_proto, const chkinfo_t *chkinfo,
                                const chkstat_t *chkstat, int item_size, int item_count);
STATIC int __table_proto_write__(const chkinfo_t *chkinfo, const chkstat_t *chkstat,
                                 const char *_buf, const buffer_t *mbuf, uint32_t size, uint32_t offset, vclock_t *vclock)
{
        int ret;
        buffer_t wbuf, *buf;
        io_t io;
        char tmp[MAX_BUF_LEN];

        ANALYSIS_BEGIN(0);

        YASSERT(size + offset <= LICH_CHUNK_SPLIT);
        YASSERT(chkinfo->id.type != __RAW_CHUNK__);

        if (_buf) {
                buf = &wbuf;
                mbuffer_init(buf, 0);

                ret = mbuffer_copy(buf, _buf, size);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                buf = (buffer_t *)mbuf;
        }

        CHKINFO_STR(chkinfo, tmp);
        DBUG("chunk "CHKID_FORMAT" offset %u size %u clock %llu, info %s\n",
              CHKID_ARG(&chkinfo->id), offset, size, (LLU)clock, tmp);

        io_init(&io, &chkinfo->id, vclock, offset, size, 0);
        ret = chunk_proto_rep_write(chkinfo, chkstat, NULL, &io, buf, NULL);
        if (unlikely(ret))
                GOTO(err_free, ret);

        ANALYSIS_END(0, 1000 * 5000, NULL);

        if (_buf)
                mbuffer_free(buf);

        return 0;
err_free:
        if (_buf)
                mbuffer_free(buf);
err_ret:
        return ret;
}

STATIC int __table_proto_read__(const chkinfo_t *chkinfo, const chkstat_t *chkstat,
                                char *_buf, buffer_t *mbuf, uint32_t size, uint32_t offset)
{
        int ret;
        buffer_t rbuf, *buf;
        io_t io;

        ANALYSIS_BEGIN(0);

        YASSERT(size + offset <= LICH_CHUNK_SPLIT);
        YASSERT(chkinfo->id.type != __RAW_CHUNK__);

        if (_buf) {
                buf = &rbuf;
                mbuffer_init(buf, 0);
        } else {
                buf = mbuf;
        }

        io_init(&io, &chkinfo->id, NULL, offset, size, 0);
        ret = chunk_proto_rep_read(chkinfo, chkstat, NULL, &io, buf, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if 0
        if (chkinfo->id.id == 2 && chkinfo->id.type == __POOL_CHUNK__ && size == 4096) {
                ret = _set_value("/tmp/xxxx.read", mbuffer_head(&buf), size, O_CREAT | O_TRUNC);
                DINFO("ret %u, size %u, offset %u\n", ret, buf.len, offset);
        }
#endif

        ANALYSIS_END(0, 1000 * 5000, NULL);

        YASSERT(buf->len == size);

        if (_buf) {
                mbuffer_get(buf, _buf, size);
                mbuffer_free(buf);
        }

        return 0;
err_ret:
        return ret;
}

int __table_proto_write(const chkinfo_t *chkinfo, chkstat_t *chkstat, const char *buf,
                        int size, int offset)
{
        int ret;
        vclock_t vclock;

        chunk_proto_clock(chkinfo, chkstat, &vclock.clock, __OP_WRITE);
        vclock.vfm = 0;
        
        ret = __table_proto_write__(chkinfo, chkstat, buf, NULL, size, offset, &vclock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        chunk_proto_reset(chkinfo, chkstat);
        return ret;
}

int table_proto_write(const chkinfo_t *chkinfo, chkstat_t *chkstat, const buffer_t *buf,
                        int size, int offset)
{
        int ret;
        vclock_t vclock;

        chunk_proto_clock(chkinfo, chkstat, &vclock.clock, __OP_WRITE);
        vclock.vfm = 0;

        ret = __table_proto_write__(chkinfo, chkstat, NULL, buf, size, offset, &vclock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        chunk_proto_reset(chkinfo, chkstat);
        return ret;
}

int __table_proto_read(const chkinfo_t *chkinfo, const chkstat_t *chkstat, char *buf,
                              int size, int offset, int *_size)
{
        int ret;

        YASSERT(offset < (int)LICH_CHUNK_SPLIT);
        if (offset + size > (int)LICH_CHUNK_SPLIT) {
                size = LICH_CHUNK_SPLIT - offset;
        }

        ret = __table_proto_read__(chkinfo, chkstat, buf, NULL, size, offset);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (_size)
                *_size = size;

        return 0;
err_ret:
        return ret;
}

int table_proto_read(const chkinfo_t *chkinfo, const chkstat_t *chkstat, buffer_t *buf,
                              int size, int offset, int *_size)
{
        int ret;

        YASSERT(offset < (int)LICH_CHUNK_SPLIT);
        if (offset + size > (int)LICH_CHUNK_SPLIT) {
                size = LICH_CHUNK_SPLIT - offset;
        }

        ret = __table_proto_read__(chkinfo, chkstat, NULL, buf, size, offset);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (_size)
                *_size = size;

        return 0;
err_ret:
        return ret;
}

#if 1
STATIC int __table_proto_write_item(table_proto_t *table_proto, const char *item, int size, int loc)
{
        int ret;
        char buf[MAX_BUF_LEN];
        record_t *record;

        YASSERT(size + sizeof(*record) < table_proto->item_size);
        YASSERT(size + sizeof(*record) < MAX_BUF_LEN);

        record = (void *)buf;
        record->crc = crc32_sum(item, size);
        record->size = size;
        record->magic = ITEM_MAGIC;
        memcpy(record->buf, item, size);

        ret = __table_proto_write(table_proto->chkinfo, table_proto->chkstat, (void *)record,
                                   size + sizeof(*record), loc * table_proto->item_size + TABLE_PROTO_ITEM);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_read_items(table_proto_t *table_proto, char *item, int from, int to, int *count)
{
        int ret, size, newsize, i;
        char *buf;
        record_t *record;

        YASSERT(from * table_proto->item_size + TABLE_PROTO_ITEM < (int)LICH_CHUNK_SPLIT);

        size = (to - from) * table_proto->item_size;

        ret = ymalloc((void **)&buf, size);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret =  __table_proto_read(table_proto->chkinfo, table_proto->chkstat, buf, size,
                                  from * table_proto->item_size + TABLE_PROTO_ITEM, &newsize);
        if (unlikely(ret)) {
                GOTO(err_free, ret);
        }

        for (i = 0; i < (to - from); i++) {
                record = (void *)buf + i * table_proto->item_size;

                if (bmap_get(&table_proto->bmap, from + i) == 0)
                        continue;

                if (record->magic == ITEM_MAGIC) {
                        DBUG("crc [%u] %u len %u\n", i, record->crc, record->size);
                        YASSERT(record->size);
                        YASSERT(crc32_sum(record->buf, record->size) == record->crc);
                        memcpy((item +  i * table_proto->item_size), record->buf, record->size);
                } else {
                        DERROR("chkid "CHKID_FORMAT" i %u magic %d size %d crc %d\n",
                               CHKID_ARG(&table_proto->chkid), i, record->magic, record->size, record->crc);
                        UNIMPLEMENTED(__DUMP__);
                        memcpy((item +  i * table_proto->item_size), (void *)record, table_proto->item_size);
                }
        }

        YASSERT(from * table_proto->item_size + TABLE_PROTO_ITEM + ret <= (int)LICH_CHUNK_SPLIT);

        *count = newsize / table_proto->item_size;

        yfree((void **)&buf);

        return 0;
err_free:
        yfree((void **)&buf);
err_ret:
        return ret;
}

#else

STATIC int __table_proto_write_item(table_proto_t *table_proto, const char *item, int size, int loc)
{
        return __table_proto_write(table_proto->chkinfo, table_proto->chkstat, item, size,
                                   loc * table_proto->item_size + TABLE_PROTO_ITEM);
}

STATIC int __table_proto_read_items(table_proto_t *table_proto, char *buf, int from, int to, int *count)
{
        int ret, size, newsize;

        YASSERT(from * table_proto->item_size + TABLE_PROTO_ITEM < (int)LICH_CHUNK_SPLIT);

        size = (to - from) * table_proto->item_size;
        ret =  __table_proto_read(table_proto->chkinfo, table_proto->chkstat, buf, size,
                                  from * table_proto->item_size + TABLE_PROTO_ITEM, &newsize);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        YASSERT(from * table_proto->item_size + TABLE_PROTO_ITEM + ret <= (int)LICH_CHUNK_SPLIT);

        *count = newsize / table_proto->item_size;

        return 0;
err_ret:
        return ret;
}

#endif

STATIC int __table_proto_write_map(table_proto_t *table_proto, const char *map, int _size)
{
        int ret;
        char *buf;
        int size, offset;

#ifdef HAVE_STATIC_ASSERT
        static_assert(TABLE_PROTO_MAP_AREA  <= PAGE_SIZE, "map");
#endif

        YASSERT(_size <= TABLE_PROTO_MAP_AREA);
        buf = mem_cache_calloc(MEM_CACHE_4K, 1);

        size = TABLE_PROTO_MAP_AREA;
        offset = TABLE_PROTO_MAP;

        if (_size < size) {
                memcpy(buf, map, _size);
                ret =  __table_proto_write(table_proto->chkinfo, table_proto->chkstat, buf, size, offset);
        } else {
                ret =  __table_proto_write(table_proto->chkinfo, table_proto->chkstat, map, size, offset);
        }
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __table_proto_read_map(table_proto_t *table_proto, char *map)
{
        int offset, size;

        size = TABLE_PROTO_MAP_AREA;
        offset = TABLE_PROTO_MAP;

        return __table_proto_read(table_proto->chkinfo, table_proto->chkstat,
                                  map, size, offset, NULL);
}

STATIC int __table_proto_setinfo__(table_proto_t *table_proto, const void *info,
                                 int size, int info_type)
{
        int ret;
        char buf[LICH_BLOCK_SIZE];
        infohead_t *head;

        YASSERT(info_type < TABLE_PROTO_INFO_MAX);
        YASSERT(sizeof(infohead_t) + size <= LICH_BLOCK_SIZE);

        if (size + sizeof(infohead_t) > TABLE_PROTO_INFO_AREA) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        head = (void *)buf;
        head->size = size;
        head->crc = crc32_sum(info, size);
        memcpy(head->buf, info, size);

        DBUG("size %u crc %u\n", head->size, head->crc);

        ret = __table_proto_write(table_proto->chkinfo, table_proto->chkstat,
                                  buf, LICH_BLOCK_SIZE,
                                  TABLE_PROTO_INFO + info_type * LICH_BLOCK_SIZE);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_setinfo(table_proto_t *table_proto, const void *info,
                                 int size, int info_type)
{
        YASSERT(info_type < TABLE_PROTO_INFO_MAX);
        //YASSERT(info_type != TABLE_PROTO_INFO_MAGIC);

        return __table_proto_setinfo__(table_proto, info, size, info_type);
}

STATIC int __table_proto_getinfo(table_proto_t *table_proto, void *info,
                                 int *len, int info_type)
{
        int ret;
        char buf[LICH_BLOCK_SIZE];
        infohead_t *head;

        YASSERT(info_type < TABLE_PROTO_INFO_MAX);
        YASSERT(*len <= LICH_BLOCK_SIZE);

        ret =  __table_proto_read(table_proto->chkinfo, table_proto->chkstat,
                                  buf, LICH_BLOCK_SIZE,
                                  TABLE_PROTO_INFO + info_type * LICH_BLOCK_SIZE, NULL);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        head = (void *)buf;
        if (head->size) {
                if (len) {
                        YASSERT((int)head->size <= *len);
                        *len = head->size;
                }

                YASSERT(head->crc == crc32_sum(head->buf, head->size));
                DBUG("size %u crc %u\n", head->size, head->crc);

                memcpy(info, head->buf, head->size);
        } else {
                DBUG("load "CHKID_FORMAT" no date\n", CHKID_ARG(&table_proto->chkid));

                if (len)
                        *len = 0;
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_commit(table_proto_t *table_proto, int loc, const void *value,
                                int len, int newitem)
{
        int ret;

        ret = __table_proto_write_item(table_proto, value, len, loc);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (newitem) {
                ret = bmap_set(&table_proto->bmap, loc);
                if (unlikely(ret))
                        YASSERT(0);

                DBUG("commit at loc %u offset %u\n", loc,
                     loc * table_proto->item_size);

                ret = __table_proto_write_map(table_proto, table_proto->bmap.bits,
                                              (table_proto->item_count / CHAR_BIT));
                if (unlikely(ret)) {
                        bmap_del(&table_proto->bmap, loc);
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

/**
 * @param table_proto
 * @param loc offset in table_proto
 * @param value
 * @return
 */
static int __table_proto_insert(table_proto_t *table_proto,
                                uint32_t loc, const void *value, int len)
{
        int ret;

        YASSERT(loc < table_proto->item_count);

        DBUG("insert into "CHKID_FORMAT" loc %u\n", CHKID_ARG(&table_proto->chkid), loc);

        ret = __table_proto_commit(table_proto, loc, value, len, 1);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_del(table_proto_t *table_proto, uint32_t loc)
{
        int ret;

        ret = bmap_del(&table_proto->bmap, loc);
        if (unlikely(ret))
                YASSERT(0);

        ret = __table_proto_write_map(table_proto, table_proto->bmap.bits,
                                      (table_proto->item_count / CHAR_BIT));
        if (unlikely(ret)) {
                bmap_set(&table_proto->bmap, loc);
                GOTO(err_ret, ret);
        }

        DBUG("table "CHKID_FORMAT" count %u\n",
              CHKID_ARG(&table_proto->chkid), table_proto->bmap.nr_one);

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_batch_del(table_proto_t *table_proto, uint32_t *loc, uint32_t n_loc)
{
        int ret;

        for(int i=0;i<n_loc;i++){
                ret = bmap_del(&table_proto->bmap, loc[i]);
                if (unlikely(ret))
                        YASSERT(0); 
        }

        ret = __table_proto_write_map(table_proto, table_proto->bmap.bits,
                                      (table_proto->item_count / CHAR_BIT));
        if (unlikely(ret)) {
                for(int i=0;i<n_loc;i++){
                        ret = bmap_set(&table_proto->bmap, loc[i]);
                        if (unlikely(ret))
                                YASSERT(0); 
                }
                GOTO(err_ret, ret);
        }

        DBUG("table "CHKID_FORMAT" count %u\n",
              CHKID_ARG(&table_proto->chkid), table_proto->bmap.nr_one);

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_get_empty(table_proto_t *table_proto, uint32_t *_loc)
{
        int ret, loc;

        loc = bmap_get_empty(&table_proto->bmap);
        if (loc == -1) {
                ret = ENOSPC;
                GOTO(err_ret, ret);
        }

        *_loc = loc;

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_loadmap(table_proto_t *table_proto)
{
        int ret;
        char *buf;

        memset(&table_proto->bmap, 0x0, sizeof(table_proto->bmap));

        ret = ymalloc((void **)&buf, TABLE_PROTO_MAP_AREA);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table_proto_read_map(table_proto, buf);
        if (unlikely(ret)) {
                GOTO(err_free, ret);
        }

        bmap_load(&table_proto->bmap, buf, (table_proto->item_count / CHAR_BIT));

        DBUG("load map of table "CHKID_FORMAT" count %llu\n",
             CHKID_ARG(&table_proto->chkid), (LLU)table_proto->bmap.nr_one);

        return 0;
err_free:
        yfree((void **)&buf);
err_ret:
        return ret;
}

STATIC int __table_proto_loaditem(table_proto_t *table_proto, const char *buf,
                                  int from, int _to, int (*iterator)(const void *, int, int, void *), int idx, void *context)
{
        int ret, loc, to;
        const void *value;
        time_t begin, now;

        
        YASSERT(from < (int)table_proto->bmap.size);
        to = _to > (int)table_proto->bmap.size ? (int)table_proto->bmap.size : _to;

        begin = gettime();
        for (loc = from; loc < to; loc++) {
                if (bmap_get(&table_proto->bmap, loc) == 0)
                        continue;

                value = (void *)buf + (loc - from) * table_proto->item_size;

                //DBUG("load value '%s' at loc %u offset %u\n", value, loc, offset +  i * table_proto->item_size);
                //DBUG("real loc %u\n", offset + TABLE_PROTO_MAP_AREA + i * table_proto->item_size);

                ret = iterator(value, loc, idx, context);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                now = gettime();
                if (now - begin > gloconf.rpc_timeout) {
                        DWARN("load table "CHKID_FORMAT" loc %u  timeout\n",
                              CHKID_ARG(&table_proto->chkid), loc);
                        ret = EAGAIN;
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}


STATIC int __table_proto_load(table_proto_t *table_proto)
{
        int ret;

        if (table_proto->map_loaded)
                goto out;
        
        ret = __table_proto_loadmap(table_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        if (table_proto->_xattr) {
                YASSERT((table_proto->item_count * table_proto->item_size) < (int)(LICH_CHUNK_SPLIT / 2));

                ret = table_xattr_init(table_proto);
                if (unlikely(ret)) {
                        GOTO(err_map, ret);
                }
        }

        if (table_proto->_snap) {
                YASSERT((table_proto->item_count * table_proto->item_size) < (int)(LICH_CHUNK_SPLIT / 2));

                ret = table_snap_init(table_proto);
                if (unlikely(ret)) {
                        GOTO(err_xattr, ret);
                }
        }

        table_proto->map_loaded = 1;
        DBUG("load table "CHKID_FORMAT"\n", CHKID_ARG(&table_proto->chkid));

out:
        return 0;
err_xattr:
        table_xattr_destroy(table_proto);
err_map:
        bmap_destroy(&table_proto->bmap);
err_ret:
        return ret;
}

#define ITEM_COUNT 1000

STATIC int __table_proto_iterator(table_proto_t *table_proto,
                                  int (*iterator)(const void *, int, int, void *), int idx, void *context)
{
        int ret, loc, buf_count, count;
        //char buf[MAX_BUF_LEN];
        char *buf = NULL;

        ret = table_proto->load(table_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = ymalloc((void **)&buf, table_proto->item_size * (int)ITEM_COUNT);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        loc = 0;
        buf_count = ITEM_COUNT;
        /* notes:item_count 16000, memcpy & iterator about used 160 * 1000 us, so yield for each 1000 items in __table_proto_read */
        while (loc < (int)table_proto->item_count) {
                if (bmap_range_empty(&table_proto->bmap, loc, loc + buf_count)) {
                        loc += buf_count;
                        continue;
                }

                ret = __table_proto_read_items(table_proto, buf, loc, loc + buf_count, &count);
                if (unlikely(ret)) {
                        if (ret == ENONET) {
                                ret = EAGAIN;
                        }
                        GOTO(err_free, ret);
                }

                //DBUG("load table %s offset %u size %u\n",
                //CHKID_ARG(&table_proto->chkid), offset + TABLE_PROTO_MAP_AREA, ret);

                if (count == 0)
                        break;

                ret = __table_proto_loaditem(table_proto, buf, loc, loc + count, iterator, idx, context);
                if (unlikely(ret))
                        GOTO(err_free, ret);

                loc += count;
        }

        table_proto->item_loaded = 1;
        yfree((void **)&buf);

        return 0;
err_free:
        yfree((void **)&buf);
err_ret:
        return ret;
}

STATIC int __table_proto_update(table_proto_t *table_proto, uint32_t loc,
                                const void *value, int len)
{
        int ret;

        ret = __table_proto_commit(table_proto, loc, value, len, 0);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC void __table_proto_unload(table_proto_t *table_proto)
{
        table_xattr_destroy(table_proto);
        table_snap_destroy(table_proto);
        bmap_destroy(&table_proto->bmap);
}

void table_proto_destroy(table_proto_t *table_proto)
{
        __table_proto_unload(table_proto);

        yfree((void **)&table_proto);
}

int table_proto_init(table_proto_t **_table_proto, const chkinfo_t *chkinfo,
                     const chkstat_t *chkstat, int item_size, int item_count, int xattr, int snap)
{
        int ret;
        table_proto_t *table_proto;

        ANALYSIS_BEGIN(0);

        //CHKINFO_DUMP(chkinfo, D_INFO);
        ret = __table_proto_create(&table_proto, chkinfo, chkstat, item_size,
                                   item_count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        table_proto->_xattr = xattr;
        table_proto->_snap = snap;

        *_table_proto = table_proto;

        ANALYSIS_END(0, IO_WARN, id2str(&chkinfo->id));

        return 0;
//err_free:
//        yfree((void **)&table_proto);
err_ret:
        return ret;
}

int table_proto_load(table_proto_t **_table_proto, const chkinfo_t *chkinfo,
                     const chkstat_t *chkstat, int item_size, int item_count, int xattr, int snap)
{
        int ret;
        table_proto_t *table_proto;

        ANALYSIS_BEGIN(0);

        //CHKINFO_DUMP(chkinfo, D_INFO);
        ret = table_proto_init(&table_proto, chkinfo, chkstat, item_size,
                               item_count, xattr, snap);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = table_proto->load(table_proto);
        if (unlikely(ret))
                GOTO(err_free, ret);

        *_table_proto = table_proto;

        ANALYSIS_END(0, IO_WARN, id2str(&chkinfo->id));

        return 0;
err_free:
        table_proto_destroy(table_proto);
err_ret:
        return ret;
}

/**
 *
 * @param chkinfo
 * @param parent
 * @param parentnid
 * @param initzero
 * @param info
 * @param size
 * @param data
 * @param data_size
 * @return
 */
int table_proto_create(const char *pool, const chkinfo_t *chkinfo, const fileid_t *parent,
                       const nid_t *parentnid, int initzero, const void *info, int size, char *data, int data_size)
{
        int ret, i;
        char buf[LICH_BLOCK_SIZE];
        infohead_t *head;
        buffer_t tmp;
        nid_t nid[LICH_REPLICA_MAX];

        if (size + sizeof(*head) > TABLE_PROTO_INFO_AREA) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        head = (void *)buf;
        head->size = size;
        head->crc = crc32_sum(info, size);
        memcpy(head->buf, info, size);

        mbuffer_init(&tmp, 0);
        if (data) {
                YASSERT(data_size > LICH_BLOCK_SIZE);

                memcpy(data, buf, sizeof(buf));
                mbuffer_copy(&tmp, (void *)data, data_size);
        } else {
                mbuffer_copy(&tmp, (void *)head, sizeof(*head) + size);
        }

        DBUG("init "CHKID_FORMAT" zero %u data %u\n",
             CHKID_ARG(&chkinfo->id), initzero, tmp.len);

        for (i = 0; i < chkinfo->repnum; i++) {
                nid[i] = chkinfo->diskid[i].id;
        }

#if ENABLE_CHUNK_DEBUG
        chkinfo_rack_check(chkinfo);
#endif

        ret = chunk_proto_rep_create(pool, &chkinfo->id, 1, nid, chkinfo->repnum,
                                     parent, parentnid, initzero, chkinfo->info_version, &tmp, NULL);
        if (unlikely(ret))
                GOTO(err_free, ret);

        mbuffer_free(&tmp);

        return 0;
err_free:
        mbuffer_free(&tmp);
err_ret:
        return ret;
}


STATIC int __table_proto_create(table_proto_t **_table_proto, const chkinfo_t *chkinfo,
                                const chkstat_t *chkstat, int item_size, int item_count)
{
        int ret;
        table_proto_t *table_proto;

        table_proto_layout_check(item_size, item_count);

        ret = ymalloc((void **)&table_proto, sizeof(*table_proto));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(table_proto, 0x0, sizeof(*table_proto));

        table_proto->insert = __table_proto_insert;
        table_proto->del = __table_proto_del;
        table_proto->batch_del = __table_proto_batch_del;
        table_proto->update = __table_proto_update;
        table_proto->iterator = __table_proto_iterator;
        table_proto->load = __table_proto_load;

        table_proto->getinfo = __table_proto_getinfo;
        table_proto->setinfo = __table_proto_setinfo;

        table_proto->get_empty = __table_proto_get_empty;

        table_proto->item_count = item_count;
        table_proto->item_size = item_size;

        table_proto->chkinfo = (void *)table_proto->__chkinfo__;
        table_proto->chkstat = (void *)table_proto->__chkstat__;
        table_proto->chkid = chkinfo->id;

        CHKINFO_CP(table_proto->chkinfo, chkinfo);
        CHKSTAT_CP(table_proto->chkstat, chkstat, chkinfo->repnum);

        *_table_proto = table_proto;

        return 0;
err_ret:
        return ret;
}
